Flink best practice: synchronizing MySQL data to TiDB using Canal 您所在的位置:网站首页 tidb canal Flink best practice: synchronizing MySQL data to TiDB using Canal

Flink best practice: synchronizing MySQL data to TiDB using Canal

2024-06-27 03:47| 来源: 网络整理| 查看: 265

Background introduction

This article will introduce how to import the data in MySQL into Kafka in the form of Binlog + Canal, and then be consumed by Flink.

In order to quickly verify the functionality of the whole process, all components are deployed in a single machine. If you have insufficient physical resources, you can build all the components in this article in a 4G 1U virtual machine environment.

If you need to deploy in a production environment, it is recommended to replace each component with a highly available cluster deployment scheme.

Among them, we have created a set of Zookeeper single node environment, which is shared by Flink, Kafka, Canal and other components.

For all components requiring JRE, such as Flink, Kafka, Canal and Zookeeper, considering that upgrading JRE may affect other applications, we choose each component to use its own JRE environment independently.

This paper is divided into two parts. The first seven sections mainly introduce the construction of the basic environment, and the last section introduces how the data flows in each component.

Data flows through the following components:

Generating Binlog from MySQL data source

The Canal reads the Binlog, generates the Canal json, and pushes it to the Topic specified by Kafka

Flink uses the Flink SQL connector Kafka API to consume data in the Kafka Topic

Flink writes data to TiDB through the Flink connector JDBC

The structure of TiDB + Flink supports the development and running of many different kinds of applications.

At present, the main features include:

Batch flow integration

Sophisticated state management

Event time support

Accurate primary state consistency guarantee

Flink can run on a variety of resource management frameworks including YARN, Mesos and Kubernetes. It also supports independent deployment on bare metal clusters. TiDB can be deployed on AWS, Kubernetes, GCP and gke. It also supports independent deployment on bare metal clusters using TiUP.

The common applications of TiDB + Flink structure are as follows:

Event driven applications

Anti fraud

anomaly detection

Rule based alarm

Business process monitoring

Data analysis application

Network quality monitoring

Product update and test evaluation analysis

Impromptu analysis of factual data

Large scale graph analysis

Data pipeline application

Construction of e-commerce real-time query index

E-commerce continuous ETL

Environment introduction

Operating system environment

[root@r20聽topology]#聽cat聽/etc/redhat-release CentOS聽Stream聽release聽8

software environment

Machine allocation

Deploy TiDB Cluster

Compared with the traditional stand-alone database, TiDB has the following advantages:

Pure distributed architecture, with good scalability and supporting elastic capacity expansion and contraction

It supports SQL, exposes the network protocol of MySQL, and is compatible with most MySQL syntax. It can directly replace MySQL in most scenarios

High availability is supported by default. When a few replicas fail, the database itself can automatically perform data repair and failover, which is transparent to the business

It supports ACID transactions and is friendly to some scenarios with strong consistent requirements, such as bank transfer

It has rich tool chain ecology, covering a variety of scenarios such as data migration, synchronization and backup

In terms of kernel design, TiDB distributed database divides the overall architecture into multiple modules, and each module communicates with each other to form a complete TiDB system. The corresponding architecture diagram is as follows:

In this article, we only do the simplest function test, so we deploy a set of single node but replica TiDB, which involves the following three modules:

TiDB Server: SQL layer, which exposes the connection endpoint of MySQL protocol. It is responsible for accepting client connections, performing SQL parsing and optimization, and finally generating distributed execution plans.

PD (Placement Driver) Server: the meta information management module of the whole TiDB cluster, which is responsible for storing the real-time data distribution of each TiKV node and the overall topology of the cluster, providing the TiDB Dashboard control interface, and assigning transaction ID s to distributed transactions.

TiKV Server: it is responsible for storing data. Externally, TiKV is a distributed key value storage engine that provides transactions.

TiUP deployment template file

#聽#聽Global聽variables聽are聽applied聽to聽all聽deployments聽and聽used聽as聽the聽default聽value聽of #聽#聽the聽deployments聽if聽a聽specific聽deployment聽value聽is聽missing. global: 聽聽user:聽"tidb" 聽聽ssh_port:聽22 聽聽deploy_dir:聽"/opt/tidb-c1/" 聽聽data_dir:聽"/opt/tidb-c1/data/" #聽#聽Monitored聽variables聽are聽applied聽to聽all聽the聽machines. #monitored: #聽聽node_exporter_port:聽19100 #聽聽blackbox_exporter_port:聽39115 #聽聽deploy_dir:聽"/opt/tidb-c3/monitored" #聽聽data_dir:聽"/opt/tidb-c3/data/monitored" #聽聽log_dir:聽"/opt/tidb-c3/log/monitored" #聽#聽Server聽configs聽are聽used聽to聽specify聽the聽runtime聽configuration聽of聽TiDB聽components. #聽#聽All聽configuration聽items聽can聽be聽found聽in聽TiDB聽docs: #聽#聽-聽TiDB:聽https://pingcap.com/docs/stable/reference/configuration/tidb-server/configuration-file/ #聽#聽-聽TiKV:聽https://pingcap.com/docs/stable/reference/configuration/tikv-server/configuration-file/ #聽#聽-聽PD:聽https://pingcap.com/docs/stable/reference/configuration/pd-server/configuration-file/ #聽#聽All聽configuration聽items聽use聽points聽to聽represent聽the聽hierarchy,聽e.g: #聽#聽聽聽readpool.storage.use-unified-pool #聽# #聽#聽You聽can聽overwrite聽this聽configuration聽via聽the聽instance-level聽`config`聽field. server_configs: 聽聽tidb: 聽聽聽聽log.slow-threshold:聽300 聽聽聽聽binlog.enable:聽false 聽聽聽聽binlog.ignore-error:聽false 聽聽聽聽tikv-client.copr-cache.enable:聽true 聽聽tikv: 聽聽聽聽server.grpc-concurrency:聽4 聽聽聽聽raftstore.apply-pool-size:聽2 聽聽聽聽raftstore.store-pool-size:聽2 聽聽聽聽rocksdb.max-sub-compactions:聽1 聽聽聽聽storage.block-cache.capacity:聽"16GB" 聽聽聽聽readpool.unified.max-thread-count:聽12 聽聽聽聽readpool.storage.use-unified-pool:聽false 聽聽聽聽readpool.coprocessor.use-unified-pool:聽true 聽聽聽聽raftdb.rate-bytes-per-sec:聽0 聽聽pd: 聽聽聽聽schedule.leader-schedule-limit:聽4 聽聽聽聽schedule.region-schedule-limit:聽2048 聽聽聽聽schedule.replica-schedule-limit:聽64 pd_servers: 聽聽-聽host:聽192.168.12.21 聽聽聽聽ssh_port:聽22 聽聽聽聽name:聽"pd-2" 聽聽聽聽client_port:聽12379 聽聽聽聽peer_port:聽12380 聽聽聽聽deploy_dir:聽"/opt/tidb-c1/pd-12379" 聽聽聽聽data_dir:聽"/opt/tidb-c1/data/pd-12379" 聽聽聽聽log_dir:聽"/opt/tidb-c1/log/pd-12379" 聽聽聽聽numa_node:聽"0" 聽聽聽聽#聽#聽The聽following聽configs聽are聽used聽to聽overwrite聽the聽`server_configs.pd`聽values. 聽聽聽聽config: 聽聽聽聽聽聽schedule.max-merge-region-size:聽20 聽聽聽聽聽聽schedule.max-merge-region-keys:聽200000 tidb_servers: 聽聽-聽host:聽192.168.12.21 聽聽聽聽ssh_port:聽22 聽聽聽聽port:聽14000 聽聽聽聽status_port:聽12080 聽聽聽聽deploy_dir:聽"/opt/tidb-c1/tidb-14000" 聽聽聽聽log_dir:聽"/opt/tidb-c1/log/tidb-14000" 聽聽聽聽numa_node:聽"0" 聽聽聽聽#聽#聽The聽following聽configs聽are聽used聽to聽overwrite聽the聽`server_configs.tidb`聽values. 聽聽聽聽config: 聽聽聽聽聽聽log.slow-query-file:聽tidb-slow-overwrited.log 聽聽聽聽聽聽tikv-client.copr-cache.enable:聽true tikv_servers: 聽聽-聽host:聽192.168.12.21 聽聽聽聽ssh_port:聽22 聽聽聽聽port:聽12160 聽聽聽聽status_port:聽12180 聽聽聽聽deploy_dir:聽"/opt/tidb-c1/tikv-12160" 聽聽聽聽data_dir:聽"/opt/tidb-c1/data/tikv-12160" 聽聽聽聽log_dir:聽"/opt/tidb-c1/log/tikv-12160" 聽聽聽聽numa_node:聽"0" 聽聽聽聽#聽#聽The聽following聽configs聽are聽used聽to聽overwrite聽the聽`server_configs.tikv`聽values. 聽聽聽聽config: 聽聽聽聽聽聽server.grpc-concurrency:聽4 聽聽聽聽聽聽#server.labels:聽{聽zone:聽"zone1",聽dc:聽"dc1",聽host:聽"host1"聽} #monitoring_servers: #聽聽-聽host:聽192.168.12.21 #聽聽聽聽ssh_port:聽22 #聽聽聽聽port:聽19090 #聽聽聽聽deploy_dir:聽"/opt/tidb-c1/prometheus-19090" #聽聽聽聽data_dir:聽"/opt/tidb-c1/data/prometheus-19090" #聽聽聽聽log_dir:聽"/opt/tidb-c1/log/prometheus-19090" #grafana_servers: #聽聽-聽host:聽192.168.12.21 #聽聽聽聽port:聽13000 #聽聽聽聽deploy_dir:聽"/opt/tidb-c1/grafana-13000" #alertmanager_servers: #聽聽-聽host:聽192.168.12.21 #聽聽聽聽ssh_port:聽22 #聽聽聽聽web_port:聽19093 #聽聽聽聽cluster_port:聽19094 #聽聽聽聽deploy_dir:聽"/opt/tidb-c1/alertmanager-19093" #聽聽聽聽data_dir:聽"/opt/tidb-c1/data/alertmanager-19093" #聽聽聽聽log_dir:聽"/opt/tidb-c1/log/alertmanager-19093"

TiDB Cluster environment

The focus of this article is not to deploy TiDB Cluster. As a rapid experimental environment, TiDB Cluster cluster with a single copy is deployed on only one machine. There is no need to deploy the monitoring environment.

[root@r20聽topology]#聽tiup聽cluster聽display聽tidb-c1-v409 Starting聽component聽`cluster`:聽/root/.tiup/components/cluster/v1.3.2/tiup-cluster聽display聽tidb-c1-v409 Cluster聽type:聽聽聽聽聽聽聽tidb Cluster聽name:聽聽聽聽聽聽聽tidb-c1-v409 Cluster聽version:聽聽聽聽v4.0.9 SSH聽type:聽聽聽聽聽聽聽聽聽聽聽builtin Dashboard聽URL:聽聽聽聽聽聽http://192.168.12.21:12379/dashboard ID聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽Role聽聽Host聽聽聽聽聽聽聽聽聽聽聽Ports聽聽聽聽聽聽聽聽OS/Arch聽聽聽聽聽聽聽Status聽聽聽Data聽Dir聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽Deploy聽Dir --聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽----聽聽----聽聽聽聽聽聽聽聽聽聽聽-----聽聽聽聽聽聽聽聽-------聽聽聽聽聽聽聽------聽聽聽--------聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽---------- 192.168.12.21:12379聽聽pd聽聽聽聽192.168.12.21聽聽12379/12380聽聽linux/x86_64聽聽Up|L|UI聽聽/opt/tidb-c1/data/pd-12379聽聽聽聽/opt/tidb-c1/pd-12379 192.168.12.21:14000聽聽tidb聽聽192.168.12.21聽聽14000/12080聽聽linux/x86_64聽聽Up聽聽聽聽聽聽聽-聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽/opt/tidb-c1/tidb-14000 192.168.12.21:12160聽聽tikv聽聽192.168.12.21聽聽12160/12180聽聽linux/x86_64聽聽Up聽聽聽聽聽聽聽/opt/tidb-c1/data/tikv-12160聽聽/opt/tidb-c1/tikv-12160 Total聽nodes:聽4

Create a table for testing

mysql>聽show聽create聽table聽t1; +-------+-------------------------------------------------------------------------------------------------------------------------------+ |聽Table聽|聽Create聽Table聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽| +-------+-------------------------------------------------------------------------------------------------------------------------------+ |聽t1聽聽聽聽|聽CREATE聽TABLE聽`t1`聽( 聽聽`id`聽int(11)聽NOT聽NULL, 聽聽PRIMARY聽KEY聽(`id`) )聽ENGINE=InnoDB聽DEFAULT聽CHARSET=utf8mb4聽COLLATE=utf8mb4_bin聽| +-------+-------------------------------------------------------------------------------------------------------------------------------+ 1聽row聽in聽set聽(0.00聽sec) Deploy Zookeeper environment

In this experiment, Zookeeper environment is configured separately to provide services for Kafka and Flink environments.

As an experimental demonstration scheme, only the stand-alone environment is deployed.

Unzip the Zookeeper package

[root@r24聽soft]#聽tar聽vxzf聽apache-zookeeper-3.6.2-bin.tar.gz [root@r24聽soft]#聽mv聽apache-zookeeper-3.6.2-bin聽/opt/zookeeper

Deploy jre for Zookeeper

[root@r24聽soft]#聽tar聽vxzf聽jre1.8.0_281.tar.gz [root@r24聽soft]#聽mv聽jre1.8.0_281聽/opt/zookeeper/jre

Modify / opt/zookeeper/bin/zkEnv.sh file to add JAVA_HOME environment variable

##聽add聽bellowing聽env聽var聽in聽the聽head聽of聽zkEnv.sh JAVA_HOME=/opt/zookeeper/jre

Create a profile for Zookeeper

[root@r24聽conf]#聽cat聽zoo.cfg聽|聽grep聽-v聽"#" tickTime=2000 initLimit=10 syncLimit=5 dataDir=/opt/zookeeper/data clientPort=2181

Start Zookeeper

[root@r24聽bin]#聽/opt/zookeeper/bin/zkServer.sh聽start

Check the status of Zookeeper

##聽check聽zk聽status [root@r24聽bin]#聽./zkServer.sh聽status ZooKeeper聽JMX聽enabled聽by聽default Using聽config:聽/opt/zookeeper/bin/../conf/zoo.cfg Client聽port聽found:聽2181.聽Client聽address:聽localhost.聽Client聽SSL:聽false. Mode:聽standalone ##聽check聽OS聽port聽status [root@r24聽bin]#聽netstat聽-ntlp Active聽Internet聽connections聽(only聽servers) Proto聽Recv-Q聽Send-Q聽Local聽Address聽聽聽聽聽聽聽聽聽聽聽Foreign聽Address聽聽聽聽聽聽聽聽聽State聽聽聽聽聽聽聽PID/Program聽name tcp聽聽聽聽聽聽聽聽0聽聽聽聽聽聽0聽0.0.0.0:22聽聽聽聽聽聽聽聽聽聽聽聽聽聽0.0.0.0:*聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽LISTEN聽聽聽聽聽聽942/sshd tcp6聽聽聽聽聽聽聽0聽聽聽聽聽聽0聽:::2181聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽:::*聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽LISTEN聽聽聽聽聽聽15062/java tcp6聽聽聽聽聽聽聽0聽聽聽聽聽聽0聽:::8080聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽:::*聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽LISTEN聽聽聽聽聽聽15062/java tcp6聽聽聽聽聽聽聽0聽聽聽聽聽聽0聽:::22聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽:::*聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽LISTEN聽聽聽聽聽聽942/sshd tcp6聽聽聽聽聽聽聽0聽聽聽聽聽聽0聽:::44505聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽:::*聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽LISTEN聽聽聽聽聽聽15062/java ##聽use聽zkCli聽tool聽to聽check聽zk聽connection [root@r24聽bin]#聽./zkCli.sh聽-server聽192.168.12.24:2181

Suggestions on Zookeeper

I personally have an immature suggestion about Zookeeper:

The Zookeeper cluster version must enable network monitoring.

In particular, pay attention to the network bandwidth in system metrics.

Deploy Kafka

Kafka is a distributed stream processing platform, which is mainly used in two types of applications:

A real-time stream data pipeline is constructed, which can reliably obtain data between systems or applications( Equivalent to message queue)

Build real-time streaming applications to transform or influence these streaming data( That is, flow processing, which changes internally between kafka stream topic and topic)

Kafka has four core API s:

The Producer API allows an application to publish a stream of data to one or more Kafka topic s.

The Consumer API allows an application to subscribe to one or more topic s and process the streaming data published to them.

The Streams API allows an application as a stream processor to consume input streams generated by one or more topics, and then produce an output stream to one or more topics for effective conversion in the input and output streams.

The Connector API allows to build and run reusable producers or consumers to connect Kafka topics to existing applications or data systems. For example, connect to a relational database and capture all changes to the table.

In this experiment, only functional verification is done, and only a stand-alone Kafka environment is built.

Download and unzip Kafka

[root@r22聽soft]#聽tar聽vxzf聽kafka_2.13-2.7.0.tgz [root@r22聽soft]#聽mv聽kafka_2.13-2.7.0聽/opt/kafka Deploy jre for Kafka [root@r22聽soft]#聽tar聽vxzf聽jre1.8.0_281.tar.gz [root@r22聽soft]#聽mv聽jre1.8.0_281聽/opt/kafka/jre

Modify Kafka's jre environment variable

[root@r22聽bin]#聽vim聽/opt/kafka/bin/kafka-run-class.sh ##聽add聽bellowing聽line聽in聽the聽head聽of聽kafka-run-class.sh JAVA_HOME=/opt/kafka/jre

Modify Kafka profile

modify聽Kafka聽configuration file聽/opt/kafka/config/server.properties ##聽change聽bellowing聽variable聽in聽/opt/kafka/config/server.properties broker.id=0 listeners=PLAINTEXT://192.168.12.22:9092 log.dirs=/opt/kafka/logs zookeeper.connect=i192.168.12.24:2181

Start Kafka

[root@r22聽bin]#聽/opt/kafka/bin/kafka-server-start.sh聽/opt/kafka/config/server.properties

View version information of Kafka

Kafka Not provided聽--version of optional To see Kafka Version information for. [root@r22聽~]#聽ll聽/opt/kafka/libs/聽|聽grep聽kafka -rw-r--r--聽1聽root聽root聽聽4929521聽Dec聽16聽09:02聽kafka_2.13-2.7.0.jar -rw-r--r--聽1聽root聽root聽聽聽聽聽聽821聽Dec聽16聽09:03聽kafka_2.13-2.7.0.jar.asc -rw-r--r--聽1聽root聽root聽聽聽聽41793聽Dec聽16聽09:02聽kafka_2.13-2.7.0-javadoc.jar -rw-r--r--聽1聽root聽root聽聽聽聽聽聽821聽Dec聽16聽09:03聽kafka_2.13-2.7.0-javadoc.jar.asc -rw-r--r--聽1聽root聽root聽聽聽892036聽Dec聽16聽09:02聽kafka_2.13-2.7.0-sources.jar -rw-r--r--聽1聽root聽root聽聽聽聽聽聽821聽Dec聽16聽09:03聽kafka_2.13-2.7.0-sources.jar.asc ...聽...

Where 2.13 is the version information of scale and 2.7.0 is the version information of Kafka.

Deploy Flink

Apache Flink is a framework and distributed processing engine for stateful computing on unbounded and bounded data streams. Flink can run in all common cluster environments and can calculate at memory speed and any size.

Apache Flink, a distributed processing framework supporting high throughput, low latency and high performance, is a framework and distributed processing engine for stateful computing of unbounded and bounded data streams. Flink is designed to run in all common cluster environments and perform calculations at memory execution speed and any size.

This experiment only does functional testing and only deploys the stand-alone Flink environment.

Download and distribute Flink

[root@r23聽soft]#聽tar聽vxzf聽flink-1.12.1-bin-scala_2.11.tgz [root@r23聽soft]#聽mv聽flink-1.12.1聽/opt/flink

Deploy jre for Flink

[root@r23聽soft]#聽tar聽vxzf聽jre1.8.0_281.tar.gz [root@r23聽soft]#聽mv聽jre1.8.0_281聽/opt/flink/jre

Add lib required by Flink

Flink聽consumption聽Kafka聽Data, required聽flink-sql-connector-kafka聽package Flink聽link聽MySQL/TiDB锛宯eed聽flink-connector-jdbc聽package [root@r23聽soft]#聽mv聽flink-sql-connector-kafka_2.12-1.12.0.jar聽/opt/flink/lib/ [root@r23聽soft]#聽mv聽flink-connector-jdbc_2.12-1.12.0.jar聽/opt/flink/lib/

Modify Flink profile

##聽add聽or聽modify聽bellowing聽lines聽in聽/opt/flink/conf/flink-conf.yaml jobmanager.rpc.address:聽192.168.12.23 env.java.home:聽/opt/flink/jre

Launch Flink

[root@r23聽~]#聽/opt/flink/bin/start-cluster.sh Starting聽cluster. Starting聽standalonesession聽daemon聽on聽host聽r23. Starting聽taskexecutor聽daemon聽on聽host聽r23.

View Flink GUI

Deploy MySQL

Unzip MySQL package

[root@r25聽soft]#聽tar聽vxf聽mysql-8.0.23-linux-glibc2.12-x86_64.tar.xz [root@r25聽soft]#聽mv聽mysql-8.0.23-linux-glibc2.12-x86_64聽/opt/mysql/

Create MySQL Service file

[root@r25聽~]#聽touch聽/opt/mysql/support-files/mysqld.service [root@r25聽support-files]#聽cat聽mysqld.service [Unit] Description=MySQL聽8.0聽database聽server After=syslog.target After=network.target [Service] Type=simple User=mysql Group=mysql #ExecStartPre=/usr/libexec/mysql-check-socket #ExecStartPre=/usr/libexec/mysql-prepare-db-dir聽%n #聽Note:聽we聽set聽--basedir聽to聽prevent聽probes聽that聽might聽trigger聽SELinux聽alarms, #聽per聽bug聽#547485 ExecStart=/opt/mysql/bin/mysqld_safe #ExecStartPost=/opt/mysql/bin/mysql-check-upgrade #ExecStopPost=/opt/mysql/bin/mysql-wait-stop #聽Give聽a聽reasonable聽amount聽of聽time聽for聽the聽server聽to聽start聽up/shut聽down TimeoutSec=300 #聽Place聽temp聽files聽in聽a聽secure聽directory,聽not聽/tmp PrivateTmp=true Restart=on-failure RestartPreventExitStatus=1 #聽Sets聽open_files_limit LimitNOFILE聽=聽10000 #聽Set聽enviroment聽variable聽MYSQLD_PARENT_PID.聽This聽is聽required聽for聽SQL聽restart聽command. Environment=MYSQLD_PARENT_PID=1 [Install] WantedBy=multi-user.target ##聽copy聽mysqld.service聽to聽/usr/lib/systemd/system/ [root@r25聽support-files]#聽cp聽mysqld.service聽聽/usr/lib/systemd/system/

Create my.cnf file

[root@r34聽opt]#聽cat聽/etc/my.cnf [mysqld] port=3306 basedir=/opt/mysql datadir=/opt/mysql/data socket=/opt/mysql/data/mysql.socket max_connections聽=聽100 default-storage-engine聽=聽InnoDB character-set-server=utf8 log-error聽=聽/opt/mysql/log/error.log slow_query_log聽=聽1 long-query-time聽=聽30 slow_query_log_file聽=聽/opt/mysql/log/show.log min_examined_row_limit聽=聽1000 log-slow-slave-statements log-queries-not-using-indexes #skip-grant-tables

Initialize and start MySQL

[root@r25聽~]#聽/opt/mysql/bin/mysqld聽--initialize聽--user=mysql聽--console [root@r25聽~]#聽chown聽-R聽mysql:mysql聽/opt/mysql [root@r25聽~]#聽systemctl聽start聽mysqld ##聽check聽mysql聽temp聽passord聽from聽/opt/mysql/log/error.log 2021-02-24T02:45:47.316406Z聽6聽[Note]聽[MY-010454]聽[Server]聽A聽temporary聽password聽is聽generated聽for聽root@localhost:聽I?nDjijxa3>-

Create a new MySQL user to connect to Canal

##聽change聽mysql聽temp聽password聽firstly mysql>聽alter聽user聽'root'@'localhost'聽identified聽by聽'mysql'; Query聽OK,聽0聽rows聽affected聽(0.00聽sec) ##聽create聽a聽management聽user聽'root'@'%' mysql>聽create聽user聽'root'@'%'聽identified聽by聽'mysql'; Query聽OK,聽0聽rows聽affected聽(0.01聽sec) mysql>聽grant聽all聽privileges聽on聽*.*聽to聽'root'@'%'; Query聽OK,聽0聽rows聽affected聽(0.00聽sec) ##聽create聽a聽canal聽replication聽user聽'canal'@'%' mysql>聽create聽user聽'canal'@'%'聽identified聽by聽'canal'; Query聽OK,聽0聽rows聽affected聽(0.01聽sec) mysql>聽grant聽select,聽replication聽slave,聽replication聽client聽on聽*.*聽to聽'canal'@'%'; Query聽OK,聽0聽rows聽affected聽(0.00聽sec) mysql>聽flush聽privileges; Query聽OK,聽0聽rows聽affected聽(0.00聽sec)

Create tables for testing in MySQL

mysql>聽show聽create聽table聽test.t2; +-------+----------------------------------------------------------------------------------+ |聽Table聽|聽Create聽Table聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽| +-------+----------------------------------------------------------------------------------+ |聽t2聽聽聽聽|聽CREATE聽TABLE聽`t2`聽( 聽聽`id`聽int聽DEFAULT聽NULL )聽ENGINE=InnoDB聽DEFAULT聽CHARSET=utf8聽| +-------+----------------------------------------------------------------------------------+ 1聽row聽in聽set聽(0.00聽sec) Deploy Canal

The main purpose of Canal is to provide incremental data subscription and consumption based on MySQL database incremental log parsing

In the early days, Alibaba had the business requirement of cross machine room synchronization due to the deployment of dual machine rooms in Hangzhou and the United States. The implementation method was mainly to obtain incremental changes based on the business trigger.

Since 2010, the business has gradually tried to obtain incremental changes through database log parsing for synchronization, resulting in a large number of database incremental subscriptions and consumption services.

Businesses based on log incremental subscription and consumption include:

database mirroring

Real time database backup

Index construction and real-time maintenance (splitting heterogeneous indexes, inverted indexes, etc.)

Service cache refresh

Incremental data processing with business logic

Current canal supports source MySQL versions, including 5.1. X, 5.5. X, 5.6. X, 5.7. X, and 8.0. X

Unzip the Canal package

[root@r26聽soft]#聽mkdir聽/opt/canal聽&&聽tar聽vxzf聽canal.deployer-1.1.4.tar.gz聽-C聽/opt/canal

Deploy jre of Canal

[root@r26聽soft]#聽tar聽vxzf聽jre1.8.0_281.tar.gz [root@r26聽soft]#聽mv聽jre1.8.0_281聽/opt/canal/jre ##聽configue聽jre,聽add聽bellowing聽line聽in聽the聽head聽of聽/opt/canal/bin/startup.sh聽 JAVA=/opt/canal/jre/bin/java

Modify the configuration file of Canal

modify聽/opt/canal/conf/canal.properties聽configuration file ##聽modify聽bellowing聽configuration canal.zkServers聽=192.168.12.24:2181 canal.serverMode聽=聽kafka canal.destinations聽=聽example聽聽聽聽聽聽聽聽##  Need in  / opt/canal/conf   Create a directory   example   Folder for storing   destination   Configuration of canal.mq.servers聽=聽192.168.12.22:9092 modify聽/opt/canal/conf/example/instance.properties聽configuration file ##聽modify聽bellowing聽configuration canal.instance.master.address=192.168.12.25:3306 canal.instance.dbUsername=canal canal.instance.dbPassword=canal canal.instance.filter.regex=.*\\..*聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽##  Filter database tables canal.mq.topic=canal-kafka Configure data flow

MySQL binlog - > canal - > Kafka channel

View MySQL Binlog information

Check the MySQL Binlog information to ensure that the Binlog is normal

mysql>聽show聽master聽status; +---------------+----------+--------------+------------------+-------------------+ |聽File聽聽聽聽聽聽聽聽聽聽|聽Position聽|聽Binlog_Do_DB聽|聽Binlog_Ignore_DB聽|聽Executed_Gtid_Set聽| +---------------+----------+--------------+------------------+-------------------+ |聽binlog.000001聽|聽聽聽聽聽2888聽|聽聽聽聽聽聽聽聽聽聽聽聽聽聽|聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽|聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽| +---------------+----------+--------------+------------------+-------------------+ 1聽row聽in聽set聽(0.00聽sec)

Create a Topic in Kafka

Create a topic Canal Kafka in Kafka. The name of this topic should correspond to Canal. MQ. Topic = Canal Kafka in the Canal configuration file / opt/canal/conf/example/instance.properties:

[root@r22聽kafka]#聽/opt/kafka/bin/kafka-topics.sh聽--create聽\ >聽--zookeeper聽192.168.12.24:2181聽\ >聽--config聽max.message.bytes=12800000聽\ >聽--config聽flush.messages=1聽\ >聽--replication-factor聽1聽\ >聽--partitions聽1聽\ >聽--topic聽canal-kafka Created聽topic聽canal-kafka. [2021-02-24聽01:51:55,050]聽INFO聽[ReplicaFetcherManager聽on聽broker聽0]聽Removed聽fetcher聽for聽partitions聽Set(canal-kafka-0)聽(kafka.server.ReplicaFetcherManager) [2021-02-24聽01:51:55,052]聽INFO聽[Log聽partition=canal-kafka-0,聽dir=/opt/kafka/logs]聽Loading聽producer聽state聽till聽offset聽0聽with聽message聽format聽version聽2聽(kafka.log.Log) [2021-02-24聽01:51:55,053]聽INFO聽Created聽log聽for聽partition聽canal-kafka-0聽in聽/opt/kafka/logs/canal-kafka-0聽with聽properties聽{compression.type聽->聽producer,聽message.downconversion.enable聽->聽true,聽min.insync.replicas聽->聽1,聽segment.jitter.ms聽->聽0,聽cleanup.policy聽->聽[delete],聽flush.ms聽->聽9223372036854775807,聽segment.bytes聽->聽1073741824,聽retention.ms聽->聽604800000,聽flush.messages聽->聽1,聽message.format.version聽->聽2.7-IV2,聽file.delete.delay.ms聽->聽60000,聽max.compaction.lag.ms聽->聽9223372036854775807,聽max.message.bytes聽->聽12800000,聽min.compaction.lag.ms聽->聽0,聽message.timestamp.type聽->聽CreateTime,聽preallocate聽->聽false,聽min.cleanable.dirty.ratio聽->聽0.5,聽index.interval.bytes聽->聽4096,聽unclean.leader.election.enable聽->聽false,聽retention.bytes聽->聽-1,聽delete.retention.ms聽->聽86400000,聽segment.ms聽->聽604800000,聽message.timestamp.difference.max.ms聽->聽9223372036854775807,聽segment.index.bytes聽->聽10485760}.聽(kafka.log.LogManager) [2021-02-24聽01:51:55,053]聽INFO聽[Partition聽canal-kafka-0聽broker=0]聽No聽checkpointed聽highwatermark聽is聽found聽for聽partition聽canal-kafka-0聽(kafka.cluster.Partition) [2021-02-24聽01:51:55,053]聽INFO聽[Partition聽canal-kafka-0聽broker=0]聽Log聽loaded聽for聽partition聽canal-kafka-0聽with聽initial聽high聽watermark聽0聽(kafka.cluster.Partition)

View all topics in Kafka:

[root@r22聽kafka]#聽/opt/kafka/bin/kafka-topics.sh聽--list聽--zookeeper聽192.168.12.24:2181 __consumer_offsets canal-kafka ticdc-test

View the information of topic ticdc test in Kafka:

[root@r22聽~]#聽/opt/kafka/bin/kafka-topics.sh聽--describe聽--zookeeper聽192.168.12.24:2181聽聽--topic聽canal-kafka Topic:聽ticdc-test聽聽聽聽聽聽聽PartitionCount:聽1聽聽聽聽聽聽聽ReplicationFactor:聽1聽聽聽聽Configs:聽max.message.bytes=12800000,flush.messages=1 聽聽聽聽聽聽聽聽Topic:聽ticdc-test聽聽聽聽聽聽聽Partition:聽0聽聽聽聽Leader:聽0聽聽聽聽聽聽聽Replicas:聽0聽聽聽聽聽Isr:聽0

8.1.3 start Canal

Before starting the Canal, you need to check the port on the Canal node:

##聽check聽MySQL聽3306聽port ##聽canal.instance.master.address=192.168.12.25:3306 [root@r26聽bin]#聽telnet聽192.168.12.25聽3306 ##聽check聽Kafka聽9092聽port ##聽canal.mq.servers聽=聽192.168.12.22:9092 [root@r26聽bin]#聽telnet聽192.168.12.22聽9092 ##聽check聽zookeeper聽2181聽port ##聽canal.zkServers聽=聽192.168.12.24:2181 [root@r26聽bin]#聽telnet聽192.168.12.24聽2181

Start Canal:

[root@r26聽bin]#聽/opt/canal/bin/startup.sh cd聽to聽/opt/canal/bin聽for聽workaround聽relative聽path LOG聽CONFIGURATION聽:聽/opt/canal/bin/../conf/logback.xml canal聽conf聽:聽/opt/canal/bin/../conf/canal.properties CLASSPATH聽:/opt/canal/bin/../conf:/opt/canal/bin/../lib/zookeeper-3.4.5.jar:/opt/canal/bin/../lib/zkclient-0.10.jar:/opt/canal/bin/../lib/spring-tx-3.2.18.RELEASE.jar:/opt/canal/bin/../lib/spring-orm-3.2.18.RELEASE.jar:/opt/canal/bin/../lib/spring-jdbc-3.2.18.RELEASE.jar:/opt/canal/bin/../lib/spring-expression-3.2.18.RELEASE.jar:/opt/canal/bin/../lib/spring-core-3.2.18.RELEASE.jar:/opt/canal/bin/../lib/spring-context-3.2.18.RELEASE.jar:/opt/canal/bin/../lib/spring-beans-3.2.18.RELEASE.jar:/opt/canal/bin/../lib/spring-aop-3.2.18.RELEASE.jar:/opt/canal/bin/../lib/snappy-java-1.1.7.1.jar:/opt/canal/bin/../lib/snakeyaml-1.19.jar:/opt/canal/bin/../lib/slf4j-api-1.7.12.jar:/opt/canal/bin/../lib/simpleclient_pushgateway-0.4.0.jar:/opt/canal/bin/../lib/simpleclient_httpserver-0.4.0.jar:/opt/canal/bin/../lib/simpleclient_hotspot-0.4.0.jar:/opt/canal/bin/../lib/simpleclient_common-0.4.0.jar:/opt/canal/bin/../lib/simpleclient-0.4.0.jar:/opt/canal/bin/../lib/scala-reflect-2.11.12.jar:/opt/canal/bin/../lib/scala-logging_2.11-3.8.0.jar:/opt/canal/bin/../lib/scala-library-2.11.12.jar:/opt/canal/bin/../lib/rocketmq-srvutil-4.5.2.jar:/opt/canal/bin/../lib/rocketmq-remoting-4.5.2.jar:/opt/canal/bin/../lib/rocketmq-logging-4.5.2.jar:/opt/canal/bin/../lib/rocketmq-common-4.5.2.jar:/opt/canal/bin/../lib/rocketmq-client-4.5.2.jar:/opt/canal/bin/../lib/rocketmq-acl-4.5.2.jar:/opt/canal/bin/../lib/protobuf-java-3.6.1.jar:/opt/canal/bin/../lib/oro-2.0.8.jar:/opt/canal/bin/../lib/netty-tcnative-boringssl-static-1.1.33.Fork26.jar:/opt/canal/bin/../lib/netty-all-4.1.6.Final.jar:/opt/canal/bin/../lib/netty-3.2.2.Final.jar:/opt/canal/bin/../lib/mysql-connector-java-5.1.47.jar:/opt/canal/bin/../lib/metrics-core-2.2.0.jar:/opt/canal/bin/../lib/lz4-java-1.4.1.jar:/opt/canal/bin/../lib/logback-core-1.1.3.jar:/opt/canal/bin/../lib/logback-classic-1.1.3.jar:/opt/canal/bin/../lib/kafka-clients-1.1.1.jar:/opt/canal/bin/../lib/kafka_2.11-1.1.1.jar:/opt/canal/bin/../lib/jsr305-3.0.2.jar:/opt/canal/bin/../lib/jopt-simple-5.0.4.jar:/opt/canal/bin/../lib/jctools-core-2.1.2.jar:/opt/canal/bin/../lib/jcl-over-slf4j-1.7.12.jar:/opt/canal/bin/../lib/javax.annotation-api-1.3.2.jar:/opt/canal/bin/../lib/jackson-databind-2.9.6.jar:/opt/canal/bin/../lib/jackson-core-2.9.6.jar:/opt/canal/bin/../lib/jackson-annotations-2.9.0.jar:/opt/canal/bin/../lib/ibatis-sqlmap-2.3.4.726.jar:/opt/canal/bin/../lib/httpcore-4.4.3.jar:/opt/canal/bin/../lib/httpclient-4.5.1.jar:/opt/canal/bin/../lib/h2-1.4.196.jar:/opt/canal/bin/../lib/guava-18.0.jar:/opt/canal/bin/../lib/fastsql-2.0.0_preview_973.jar:/opt/canal/bin/../lib/fastjson-1.2.58.jar:/opt/canal/bin/../lib/druid-1.1.9.jar:/opt/canal/bin/../lib/disruptor-3.4.2.jar:/opt/canal/bin/../lib/commons-logging-1.1.3.jar:/opt/canal/bin/../lib/commons-lang3-3.4.jar:/opt/canal/bin/../lib/commons-lang-2.6.jar:/opt/canal/bin/../lib/commons-io-2.4.jar:/opt/canal/bin/../lib/commons-compress-1.9.jar:/opt/canal/bin/../lib/commons-codec-1.9.jar:/opt/canal/bin/../lib/commons-cli-1.2.jar:/opt/canal/bin/../lib/commons-beanutils-1.8.2.jar:/opt/canal/bin/../lib/canal.store-1.1.4.jar:/opt/canal/bin/../lib/canal.sink-1.1.4.jar:/opt/canal/bin/../lib/canal.server-1.1.4.jar:/opt/canal/bin/../lib/canal.protocol-1.1.4.jar:/opt/canal/bin/../lib/canal.prometheus-1.1.4.jar:/opt/canal/bin/../lib/canal.parse.driver-1.1.4.jar:/opt/canal/bin/../lib/canal.parse.dbsync-1.1.4.jar:/opt/canal/bin/../lib/canal.parse-1.1.4.jar:/opt/canal/bin/../lib/canal.meta-1.1.4.jar:/opt/canal/bin/../lib/canal.instance.spring-1.1.4.jar:/opt/canal/bin/../lib/canal.instance.manager-1.1.4.jar:/opt/canal/bin/../lib/canal.instance.core-1.1.4.jar:/opt/canal/bin/../lib/canal.filter-1.1.4.jar:/opt/canal/bin/../lib/canal.deployer-1.1.4.jar:/opt/canal/bin/../lib/canal.common-1.1.4.jar:/opt/canal/bin/../lib/aviator-2.2.1.jar:/opt/canal/bin/../lib/aopalliance-1.0.jar: cd聽to聽/opt/canal/bin聽for聽continue

View Canal log

View / opt/canal/logs/example/example.log

2021-02-24聽01:41:40.293聽[destination聽=聽example聽,聽address聽=聽/192.168.12.25:3306聽,聽EventParser]聽WARN聽聽c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy聽-聽--->聽begin聽to聽find聽start聽position,聽it聽will聽be聽long聽time聽for聽reset聽or聽first聽position 2021-02-24聽01:41:40.293聽[destination聽=聽example聽,聽address聽=聽/192.168.12.25:3306聽,聽EventParser]聽WARN聽聽c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy聽-聽prepare聽to聽find聽start聽position聽just聽show聽master聽status 2021-02-24聽01:41:40.542聽[destination聽=聽example聽,聽address聽=聽/192.168.12.25:3306聽,聽EventParser]聽WARN聽聽c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy聽-聽--->聽find聽start聽position聽successfully,聽EntryPosition[included=false,journalName=binlog.000001,position=4,serverId=1,gtid=,timestamp=1614134832000]聽cost聽:聽244ms聽,聽the聽next聽step聽is聽binlog聽dump

View consumer information in Kafka

Insert a test message into MySQL:

mysql>聽insert聽into聽t2聽values(1); Query聽OK,聽1聽row聽affected聽(0.00聽sec)

Check the information of the consumer, and you have the test data just inserted:

/opt/kafka/bin/kafka-console-consumer.sh聽--bootstrap-server聽192.168.12.22:9092聽--topic聽canal-kafka聽--from-beginning {"data":null,"database":"test","es":1614151725000,"id":2,"isDdl":false,"mysqlType":null,"old":null,"pkNames":null,"sql":"create聽database聽test","sqlType":null,"table":"","ts":1614151725890,"type":"QUERY"} {"data":null,"database":"test","es":1614151746000,"id":3,"isDdl":true,"mysqlType":null,"old":null,"pkNames":null,"sql":"create聽table聽t2(id聽int)","sqlType":null,"table":"t2","ts":1614151746141,"type":"CREATE"} {"data":[{"id":"1"}],"database":"test","es":1614151941000,"id":4,"isDdl":false,"mysqlType":{"id":"int"},"old":null,"pkNames":null,"sql":"","sqlType":{"id":4},"table":"t2","ts":1614151941235,"type":"INSERT"}

Kafka - > Flink path

Create the t2 table in Flink, and the connector type is kafka

##聽create聽a聽test聽table聽t2聽in聽Flink Flink聽SQL>聽create聽table聽t2(id聽int) >聽WITH聽( >聽聽'connector'聽=聽'kafka', >聽聽'topic'聽=聽'canal-kafka', >聽聽'properties.bootstrap.servers'聽=聽'192.168.12.22:9092', >聽聽'properties.group.id'聽=聽'canal-kafka-consumer-group', >聽聽'format'聽=聽'canal-json', >聽聽'scan.startup.mode'聽=聽'latest-offset' >聽); Flink聽SQL>聽select聽*聽from聽t1;

Insert a test data in MySQL:

mysql>聽insert聽into聽test.t2聽values(2); Query聽OK,聽1聽row聽affected聽(0.00聽sec)

Sync data in real time from Flink:

Flink聽SQL>聽select聽*聽from聽t1; 聽Refresh:聽1聽s聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽Page:聽Last聽of聽1聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽Updated:聽02:49:27.366 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽id 聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽聽2

Flink - > tidb path

Create a table for testing in the downstream TiDB

[root@r20聽soft]#聽mysql聽-uroot聽-P14000聽-hr21 mysql>聽create聽table聽t3聽(id聽int); Query聽OK,聽0聽rows聽affected聽(0.31聽sec)

Create a test table in Flink

Flink聽SQL>聽CREATE聽TABLE聽t3聽( >聽聽聽聽聽id聽int >聽)聽with聽( >聽聽聽聽聽'connector'聽=聽'jdbc', >聽聽聽聽聽'url'聽=聽'jdbc:mysql://192.168.12.21:14000/test', >聽聽聽聽聽'table-name'聽=聽't3', >聽聽聽聽聽'username'聽=聽'root', >聽聽聽聽聽'password'聽=聽'mysql' >聽); Flink聽SQL>聽insert聽into聽t3聽values(3); [INFO]聽Submitting聽SQL聽update聽statement聽to聽the聽cluster... [INFO]聽Table聽update聽statement聽has聽been聽successfully聽submitted聽to聽the聽cluster: Job聽ID:聽a0827487030db177ee7e5c8575ef714e

View the inserted data in the downstream TiDB

mysql>聽select聽*聽from聽test.t3; +------+ |聽id聽聽聽| +------+ |聽聽聽聽3聽| +------+ 1聽row聽in聽set聽(0.00聽sec)


【本文地址】

公司简介

联系我们

今日新闻

    推荐新闻

    专题文章
      CopyRight 2018-2019 实验室设备网 版权所有